本次實驗參考範例來自於官方 Repo,更正了裡面的一些小 Bug 及更改最後的 Vocoder。
解壓縮後的資構應是如此,每個資料夾有約 100 多首語音不等
wavs-
p225
...
...
p314
import os
import pickle
import numpy as np
import soundfile as sf
import librosa
from scipy import signal
from scipy.signal import get_window
from librosa.filters import mel
from librosa.util import normalize
from numpy.random import RandomState
# 你放資料的地方
rootDir = './wavs'
# 你要存 MEL 的地方
targetDir = './spmel'
dirName, subdirList, _ = next(os.walk(rootDir))
print('Found directory: %s' % dirName)
def mel_gan_handler(x, fft_length = 1024, hop_length = 256,sr = 22050):
wav = normalize(x)
p = (fft_length - hop_length) // 2
wav = np.squeeze(np.pad(wav, (p, p), "reflect"))
fft = librosa.stft(
wav,
n_fft = fft_length,
hop_length = hop_length,
window = 'hann',
center = False
)
# 這裡的 abs 是 sqrt(實部**2 + 虛部**2)
mag = abs(fft)
mel_basis = mel(sr, 1024, fmin = 0.0 , fmax=None, n_mels=80)
mel_output = np.dot(mel_basis,mag)
log_mel_spec = np.log10(np.maximum(1e-5,mel_output)).astype(np.float32)
return log_mel_spec
# VCTK 是 48 KHz 我們必須先 resample 到 22.05 KHz
new_rate = 22050
for subdir in sorted(subdirList):
if not os.path.exists(os.path.join(targetDir, subdir)):
os.makedirs(os.path.join(targetDir, subdir))
_,_, fileList = next(os.walk(os.path.join(dirName,subdir)))
for fileName in sorted(fileList):
x, fs = sf.read(os.path.join(dirName,subdir,fileName))
# Important !!!
# change sample rate from 48000 -> 22050
# Since mel_gan use 22050
x = librosa.resample(x,fs,new_rate)
S = mel_gan_handler(x)
np.save(os.path.join(targetDir, subdir, fileName[:-5]),
S.astype(np.float32), allow_pickle=False)
print(f"Done --- {subdir}")
結束之後根目錄會多一個資料夾名為 spmel,裏頭的結構跟 wavs 的一模一樣
你可以讀取裏頭的 .npy 然後再參考 Day8 的方法來確定一下是否能成功轉回語音
下載 Pre_train Model 後定義 D_VECTOR,這裡用的是 LSTM 版的
import torch
import torch.nn as nn
class D_VECTOR(nn.Module):
"""d vector speaker embedding."""
def __init__(self, num_layers=3, dim_input=40, dim_cell=256, dim_emb=64):
super(D_VECTOR, self).__init__()
self.lstm = nn.LSTM(input_size=dim_input, hidden_size=dim_cell,
num_layers=num_layers, batch_first=True)
self.embedding = nn.Linear(dim_cell, dim_emb)
def forward(self, x):
self.lstm.flatten_parameters()
lstm_out, _ = self.lstm(x)
embeds = self.embedding(lstm_out[:,-1,:])
norm = embeds.norm(p=2, dim=-1, keepdim=True)
embeds_normalized = embeds.div(norm)
return embeds_normalized
接著要 Load_state_dict,注意 num_uttrs 這個參數
import os
import pickle
from model_bl import D_VECTOR
from collections import OrderedDict
import numpy as np
import torch
C = D_VECTOR(dim_input=80, dim_cell=768, dim_emb=256).eval().cpu()
c_checkpoint = torch.load('3000000-BL.ckpt',map_location=torch.device('cpu'))
new_state_dict = OrderedDict()
for key, val in c_checkpoint['model_b'].items():
new_key = key[7:]
new_state_dict[new_key] = val
C.load_state_dict(new_state_dict)
# 指的是說一個語者說了幾種不同內容的話,讓資料的數量盡量一樣,內容可以不一樣。
num_uttrs = 68
len_crop = 176
# Directory containing mel-spectrograms
rootDir = './spmel'
dirName, subdirList, _ = next(os.walk(rootDir))
print('Found directory: %s' % dirName)
def pad_along_axis(array: np.ndarray, target_length: int, axis: int = 0):
pad_size = target_length - array.shape[axis]
if pad_size <= 0:
return array
npad = [(0, 0)] * array.ndim
npad[axis] = (0, pad_size)
return np.pad(array, pad_width=npad, mode='constant', constant_values=0)
speakers = []
for speaker in sorted(subdirList[1:]):
print('Processing speaker: %s' % speaker)
utterances = []
utterances.append(speaker)
_, _, fileList = next(os.walk(os.path.join(dirName,speaker)))
fileList = fileList[:num_uttrs]
# make speaker embedding
assert len(fileList) >= num_uttrs
idx_uttrs = np.random.choice(len(fileList), size=num_uttrs, replace=False)
embs = []
for i in range(num_uttrs):
tmp = np.load(os.path.join(dirName, speaker, fileList[idx_uttrs[i]]))
# pad if the current one is too short
if tmp.shape[0] <= len_crop:
pad = int(len_crop - tmp.shape[0])
tmp = pad_along_axis(tmp,pad)
melsp = torch.from_numpy(tmp[np.newaxis,:, :]).cuda()
else:
left = np.random.randint(0, tmp.shape[0]-len_crop)
melsp = torch.from_numpy(tmp[np.newaxis, left:left+len_crop, :]).cuda()
emb = C(melsp)
embs.append(emb.detach().squeeze().cpu().numpy())
utterances.append(np.mean(embs, axis=0))
for fileName in sorted(fileList):
utterances.append(os.path.join(speaker,fileName))
speakers.append(utterances)
with open(os.path.join(rootDir, 'train.pkl'), 'wb') as handle:
pickle.dump(speakers, handle)
最後會在 ./spmel 裡生成一個 train.pkl 看起來是這樣
跟官方的一模一樣
from torch.utils import data
import torch
import numpy as np
import pickle
import os
from multiprocessing import Process, Manager
class Utterances(data.Dataset):
"""Dataset class for the Utterances dataset."""
def __init__(self, root_dir, len_crop):
"""Initialize and preprocess the Utterances dataset."""
self.root_dir = root_dir
self.len_crop = len_crop
self.step = 10
metaname = os.path.join(self.root_dir, "train.pkl")
meta = pickle.load(open(metaname, "rb"))
"""Load data using multiprocessing"""
manager = Manager()
meta = manager.list(meta)
dataset = manager.list(len(meta)*[None])
processes = []
for i in range(0, len(meta), self.step):
p = Process(target=self.load_data,
args=(meta[i:i+self.step],dataset,i))
p.start()
processes.append(p)
for p in processes:
p.join()
self.train_dataset = list(dataset)
self.num_tokens = len(self.train_dataset)
print('Finished loading the dataset...')
def load_data(self, submeta, dataset, idx_offset):
for k, sbmt in enumerate(submeta):
uttrs = len(sbmt)*[None]
for j, tmp in enumerate(sbmt):
if j < 2: # fill in speaker id and embedding
uttrs[j] = tmp
else: # load the mel-spectrograms
uttrs[j] = np.load(os.path.join(self.root_dir, tmp))
dataset[idx_offset+k] = uttrs
def __getitem__(self, index):
# pick a random speaker
dataset = self.train_dataset
list_uttrs = dataset[index]
emb_org = list_uttrs[1]
# pick random uttr with random crop
a = np.random.randint(2, len(list_uttrs))
tmp = list_uttrs[a]
if tmp.shape[0] < self.len_crop:
len_pad = self.len_crop - tmp.shape[0]
uttr = np.pad(tmp, ((0,len_pad),(0,0)), 'constant')
elif tmp.shape[0] > self.len_crop:
left = np.random.randint(tmp.shape[0]-self.len_crop)
uttr = tmp[left:left+self.len_crop, :]
else:
uttr = tmp
return uttr, emb_org
def __len__(self):
"""Return the number of spkrs."""
return self.num_tokens
def get_loader(root_dir, batch_size=2, len_crop=176, num_workers=0):
"""Build and return a data loader."""
dataset = Utterances(root_dir, len_crop)
worker_init_fn = lambda x: np.random.seed((torch.initial_seed()) % (2**32))
data_loader = data.DataLoader(dataset=dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
drop_last=True,
worker_init_fn=worker_init_fn)
return data_loader
使用的時候你只需要
vcc_loader = get_loader('./spmel', BATCH_SIZE, LEN_CROP)
for j in range(step):
try:
x_real, emb_org = next(data_iter)
except:
data_iter = iter(vcc_loader)
x_real, emb_org = next(data_iter)
###
train model here
###
就可以了
明天繼續努力!!!